﻿#ifndef __OFFLINE_FIFO_H__
#define __OFFLINE_FIFO_H__

#define HAS_LZ4
#define HAS_UTILS_FS

#include <stdint.h>
#include <stdio.h>

#include <string>
#include <deque>
#include <thread>
#include <vector>

#include <GSLAM/core/SPtr.h>
#include <GSLAM/core/Svar.h>
#include <GSLAM/core/Timer.h>
#include <GSLAM/core/Mutex.h>

#ifdef HAS_UTILS_FS
#include <GSLAM/core/Utils.h>
#endif

#ifdef HAS_LZ4
#include <GSLAM/core/Utils_LZ4.h>
#endif

typedef std::lock_guard<GSLAM::Mutex> ScopedLock;


/**
 * Offline FIFO class
 *
 * This class will automatically save buffer item to disk,
 * when the queue size execeed given `writeThreshold`
 * When pop the item in the queue, this class will automaticall load save data from disk
 *
 */
class OfflineFifo
{
protected:
    struct Item
    {
        uint8_t*        buf;                // data
        uint64_t        len;                // data length

        int             id;                 // id number
        int             offline;            // 0 - not offline (in memory),
                                            // 1 - offline (save to disk)
    };

    typedef SPtr<Item> SItem;

public:
    /**
     * @brief OfflineFifo
     *
     * @param tempPath              [in] temp buffer dir name and file prefix name
     * @param writeThreshold        [in] when queue size exceed the value,
     *                                   new push item will be save to disk
     */
    OfflineFifo(const std::string& tempPath = "./ob", int writeThreshold=20)
        :_tempPath(tempPath), _writeThreshold(writeThreshold), _uid(0), _shouldStop(false), _readAhead(10)
    {
        // max queue size
        _maxQueueSize = 1000;

        // start write/read threads
        _threadWrite = std::thread(&OfflineFifo::offlineWrite, this);
        _threadRead  = std::thread(&OfflineFifo::offlineRead,  this);

        // create temp dir if not exist
        createTemps();
    }

    ~OfflineFifo()
    {
        stop();
        removeTemps();

        _buff.clear();
        _buffWrite.clear();
    }


    int push(uint8_t* buf, uint64_t len, bool bufcopy=false)
    {
        SItem item = SItem(new Item());
        item->id = _uid++;
        item->offline = 0;

        if( bufcopy )
        {
            item->buf = (uint8_t*) malloc(len);
            memcpy(item->buf, buf, len);
        }
        else
        {
            item->buf = buf;
        }
        item->len = len;

        // push to FIFO
        {
            ScopedLock mu(_mutexQueue);

            _buff.push_back(item);
        }

        // if FIFO exceed given size, then backup data to file
        if( _buff.size() > _writeThreshold )
        {
            ScopedLock mu2(_mutexWrite);

            _buffWrite.push_back(item);
        }

        // if queue size is too large, then remove some items
        if( _buff.size() > _maxQueueSize )
        {
            while( _buff.size() > _maxQueueSize )
            {
                uint8_t *buf = NULL;
                uint64_t buflen;
                pop(&buf, &buflen);

                free(buf);
            }
        }

        return 0;
    }


    int pop(uint8_t** buf, uint64_t* len)
    {
        SItem item;
        ScopedLock mu(_mutexQueue);

        // get first item
        if( _buff.size() > 0 )
        {
            item = _buff.front();
            _buff.pop_front();
        }
        else
        {
            *len = 0;
            return -1;
        }

        if( item->offline ) itemLoad(item);

        if( *buf != NULL ) free(*buf);

        *buf = item->buf;
        *len = item->len;

        return 0;
    }

    /**
     * @brief get the current queue size
     *
     * @return queue item number
     */
    int size(void)
    {
        ScopedLock mu(_mutexQueue);

        return _buff.size();
    }

    /**
     * @brief stop the FIFO system
     *
     * @return
     */
    int stop(void)
    {
        _shouldStop = true;
        while( !_threadRead.joinable() || !_threadWrite.joinable() )
            GSLAM::Rate::sleep(0.01);

        _threadWrite.join();
        _threadRead.join();

        return 0;
    }

    /**
     * @brief setQueueMaxSize
     * @param maxsize           - [in] queue size
     * @return
     */
    int setQueueMaxSize(int maxsize)
    {
        _maxQueueSize = maxsize;
    }

    /**
     * @brief getQueueMaxSize
     * @return
     */
    int getQueueMaxSize(void)
    {
        return _maxQueueSize;
    }


    /**
     * @brief Create temp dir
     *
     * @return
     */
    int createTemps(void)
    {
#ifdef HAS_UTILS_FS
        std::string p = GSLAM::utils::path_getPathName(_tempPath);
        if( p.size() > 0 ) GSLAM::utils::path_mkdir(p);
#endif

        return 0;
    }

    /**
     * @brief Remove all temp files & directory
     *
     * @return
     */
    int removeTemps(void)
    {
        // remove all files in temp data dir
#ifdef HAS_UTILS_FS
        std::string p = GSLAM::utils::path_getPathName(_tempPath);
        if( p.size() > 0 )
        {
            GSLAM::utils::path_rmdir(p);
            GSLAM::utils::path_mkdir(p);
        }
#endif

        return 0;
    }


protected:
    int offlineWrite(void)
    {
        GSLAM::Rate r(100);

        while( !_shouldStop )
        {
            while( _buffWrite.size() > 0 )
            {
                // get a new item
                SItem it;
                {
                    ScopedLock mu(_mutexWrite);

                    it = _buffWrite.front();
                    _buffWrite.pop_front();
                }

                // save to disk if necessary
                itemSave(it);

                // free memory and set offline flag
                free(it->buf);
                it->buf = NULL;
                it->offline = 1;
            }

            r.sleep();
        }

        return 0;
    }

    int offlineRead(void)
    {
        GSLAM::Rate r(100);

        while( !_shouldStop )
        {
            for(int i=0; i<_readAhead; i++)
            {
                SItem it;
                ScopedLock mu(_mutexQueue);

                if( i < _buff.size() )
                {
                    it = _buff[i];
                }
                else
                {
                    continue;
                }

                if( it->offline )
                {
                    itemLoad(it);
                    it->offline = 0;
                }
            }

            r.sleep();
        }

        return 0;
    }

    int itemSave(SItem& it)
    {
        char fname[2048];

        sprintf(fname, "%s_%09d", _tempPath.c_str(), it->id);
        FILE *fp = fopen(fname, "wb+");
        if( fp == NULL )
        {
            fprintf(stderr, "Can not open file for write: %s\n", fname);
            createTemps();
            return -1;
        }

#ifdef HAS_LZ4
        uint64_t len_ori, len_comp;
        char *buf_comp;

        len_ori = it->len;

        buf_comp = (char*) malloc(len_ori);
        len_comp = LZ4_compress_default((char*)it->buf, buf_comp, len_ori, len_ori);

        if( len_comp < 1 )
        {
            // if compressed failed, then store original data
            len_comp = len_ori;

            fwrite(&len_ori,   sizeof(len_ori),  1, fp);
            fwrite(&len_comp,  sizeof(len_comp), 1, fp);
            fwrite(it->buf,    len_ori,          1, fp);
        }
        else
        {
            fwrite(&len_ori,   sizeof(len_ori),  1, fp);
            fwrite(&len_comp,  sizeof(len_comp), 1, fp);
            fwrite(buf_comp,   len_comp,         1, fp);
        }


        free(buf_comp);
#else
        uint64_t bs = it->len;
        fwrite(&bs, sizeof(bs), 1, fp);
        fwrite(it->buf, bs, 1, fp);
#endif

        fclose(fp);

        it->offline = 1;

        return 0;
    }

    int itemLoad(SItem& it)
    {
        char fname[2048];

        // generate temp filename
        sprintf(fname, "%s_%09d", _tempPath.c_str(), it->id);
        FILE *fp = fopen(fname, "rb");
        if( fp == NULL )
        {
            fprintf(stderr, "Can not open file for read: %s\n", fname);
            return -1;
        }

#ifdef HAS_LZ4
        uint64_t len_comp, len_ori, nread, ndecomp;
        char *buf_comp;

        nread = fread(&len_ori,  sizeof(len_ori),  1, fp);
        nread = fread(&len_comp, sizeof(len_comp), 1, fp);

        if( it->buf != NULL) free(it->buf);
        it->buf = (uint8_t*) malloc(len_ori);

        if( len_ori == len_comp )
        {
            nread = fread(it->buf, len_ori, 1, fp);
        }
        else
        {
            buf_comp = (char*) malloc(len_comp);
            nread = fread(buf_comp, len_comp, 1, fp);

            ndecomp = LZ4_decompress_safe(buf_comp, (char*)it->buf, len_comp, len_ori);

            free(buf_comp);
        }

#else
        // read data
        uint64_t bs = 0, nread;
        nread = fread(&bs, sizeof(bs), 1, fp);
        it->len = bs;

        if( it->buf != NULL) free(it->buf);
        it->buf = (uint8_t*) malloc(bs);
        nread = fread(it->buf, bs, 1, fp);
#endif

        fclose(fp);

        // remove temp file
#ifdef HAS_UTILS_FS
        GSLAM::utils::path_rmfile(fname);
#endif

        // set offline to off
        it->offline = 0;

        return 0;
    }


private:
    std::string             _tempPath;
    int                     _writeThreshold, _readAhead;
    int                     _maxQueueSize;
    mutable int             _uid;

    std::deque<SItem>        _buff, _buffWrite;

    bool                    _shouldStop;
    std::thread             _threadWrite, _threadRead;
    GSLAM::Mutex            _mutexQueue, _mutexWrite;
};


#endif // end of __OFFLINE_FIFO_H__

